package com.dahuan.processfunction;

import com.dahuan.bean.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;


public class Process_SideOutCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        final String HOST = "localhost";
        final int POST = 8888;
        DataStream<String> inputStream = env.socketTextStream( HOST, POST );

        DataStream<SensorReading> dataStream = inputStream.map( data -> {
            String[] split = data.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } );


        //TODO 定义一个OutPutTag,用来表示测输出流低温流
        OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>( "low-temp" ){};


        //TODO 自定义侧输出流实现分流操作
        SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process( new ProcessFunction<SensorReading, SensorReading>() {

            @Override
            public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {

                //TODO 判断温度，大于30度，高温流输出到主流，小于低温输出到侧输出流
                if(value.getTemperature() > 30){
                    out.collect( value );
                }else{
                    ctx.output(lowTempTag,value);
                }
            }
        } );

        highTempStream.print( "high-temp" );
        highTempStream.getSideOutput( lowTempTag ).print("low-temp");

        env.execute( "Process_ApplicationCase" );

    }

}
